use super::{
    Aborted, ActiveWorker, Attr, Exit, FnOnceFuture, JoinHandle, Runtime, SetAborted, SetAffinity,
    Sleep, TaskRef, Worker, Yield,
};
use crate::{Error, Result};
use core::future::Future;
use core::time::Duration;

/// 方便在同步环境中调用异步函数的主入口,
/// 异步函数执行结束后本函数才返回，即当前线程阻塞执行，因此对异步函数体无任何要求, 这是和spawn接口的最大区别
pub fn block_on_with<T>(future: T, attr: &Attr) -> Result<T::Output>
where
    T: Future,
{
    if let Ok(task) = sched(future, attr) {
        JoinHandle::<T::Output>::new(task).join()
    } else {
        Err(Error::default())
    }
}

/// 参见block_on_with, 参数attr对应为Attr::default()
pub fn block_on<T>(future: T) -> Result<T::Output>
where
    T: Future,
{
    block_on_with(future, &Attr::default())
}

/// 启动一个全新的异步执行任务. 本函数可以在异步环境中调用，也可以在同步环境中调用.
/// 和当前线程并发且结束时间不确定，因此有Send和'static要求.
/// 如果同时创建多个异步任务，建议使用JoinSet, 可以更高效的等待全部任务或者某个任务结束.
pub fn spawn_with<T>(future: T, attr: &Attr) -> JoinHandle<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    if let Ok(task) = sched(future, attr) {
        JoinHandle::<T::Output>::new(task)
    } else {
        JoinHandle::<T::Output>::null()
    }
}

/// 参见spawn_with, attr为Attr::default()
pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    spawn_with(future, &Attr::default())
}

/// 参见spawn_fn_with, attr为Attr::default()
pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R>
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    spawn(FnOnceFuture::new(f))
}

/// 将已有的同步函数包装为异步任务执行体.
/// 注意此函数中不要调用阻塞函数，如果必须执行阻塞函数，可以利用attr交给另一个运行时实例来完成.
/// 比如: spawn_fn_with(block_function, &Attr::new(BLOCK_RUNTIME_ID))
pub fn spawn_fn_with<F, R>(f: F, attr: &Attr) -> JoinHandle<R>
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    spawn_with(FnOnceFuture::new(f), attr)
}

/// 启动一个全新的异步执行任务，本函数只能在异步环境中调用，否则JoinHandle::join返回Err.
/// 固定在当前线程中调度，因此只有'static生命周期要求，无Send要求.
pub fn spawn_local_with<T>(future: T, attr: &Attr) -> JoinHandle<T::Output>
where
    T: Future + 'static,
    T::Output: 'static,
{
    if let Some(worker) = ActiveWorker::current() {
        if let Ok(task) = worker.spawn_local(future, attr) {
            return JoinHandle::<T::Output>::new(task);
        }
    }
    JoinHandle::<T::Output>::null()
}

/// 参见spawn_local_with, attr为Attr::default()
pub fn spawn_local<T>(future: T) -> JoinHandle<T::Output>
where
    T: Future + 'static,
    T::Output: 'static,
{
    spawn_local_with(future, &Attr::default())
}

/// 将已有的同步函数包装为异步任务执行体,在当前调度线程执行.
/// 注意函数中不应该有阻塞调用，否则会阻塞当前线程其他任务的执行.
/// 参见spawn_local_with
pub fn spawn_fn_local_with<F, R>(f: F, attr: &Attr) -> JoinHandle<R>
where
    F: FnOnce() -> R + 'static,
    R: 'static,
{
    spawn_local_with(FnOnceFuture::new(f), attr)
}

/// 参见spawn_fn_local, attr为Attr::default()
pub fn spawn_fn_local<F, R>(f: F) -> JoinHandle<R>
where
    F: FnOnce() -> R + 'static,
    R: 'static,
{
    spawn_local(FnOnceFuture::new(f))
}

/// sleep的异步任务可以取消`JoinHandle::abort`，但资源也只有在sleep返回后才释放.
pub async fn sleep(timeout: Duration) {
    Sleep::new(timeout).await
}

/// 放弃当前调度
pub async fn yield_now() {
    Yield::new().await
}

/// 设置之后，本task只在当前的线程中被调度，避免线程间切换.
pub async fn set_affinity() {
    SetAffinity.await
}

/// 设置强制退出当前task的标志.
/// 如果Task最终返回Pending，则会被强制终止，终止之前会被调用一次，此时task_aborted().await返回true.
pub async fn task_exit() {
    Exit.await
}

/// 当前task是否被提前终止，提前终止有两种情况，一种是异步任务内部调用task_exit().await, 一种是JoinHandle::abort().
/// 注意：如果JoinHandle::abort()发生在首次调度之前，则异步任务被直接放弃，不被执行.
pub async fn task_aborted() -> bool {
    Aborted.await
}

/// 此设置后，后续的异步函数内部task_aborted().await会返回设置的值.
/// 如果发生了task切换，此状态会被丢失.
/// 如果期望退出当前task，请使用task_exit().await
pub async fn task_set_aborted(aborted: bool) {
    SetAborted(aborted).await
}

// 放宽了T的约束. 正确性由对外接口中的约束保证:
// spawn系列必须是Future + Send + 'static
// spawn_local系列必须是Future + 'static
// block_系列必须是Future.
fn sched<T: Future>(future: T, attr: &Attr) -> Result<TaskRef> {
    if let Some(worker) = ActiveWorker::current() {
        if worker.group_id() == attr.group_id {
            return worker.spawn(future, attr);
        }
    }
    Runtime::spawn(Runtime::get(attr.group_id), future, attr)
}

#[cfg(test)]
mod test {
    use crate::runtime::*;
    use core::time::Duration;
    #[test]
    fn test_future() {
        let _ = Builder::new().nth(1).max_cache(1).build();
        async fn test_foo() -> i32 {
            100
        }
        let val = spawn(test_foo()).join().unwrap();
        assert_eq!(val, 100);
    }

    #[test]
    fn test_task_spawn() {
        let _ = Builder::new().nth(1).max_cache(1).build();
        async fn test_foo() -> i32 {
            100
        }
        async fn test_bar() -> i32 {
            task::spawn(test_foo()).await.await.unwrap() + spawn_local(test_foo()).await.unwrap()
        }

        let val = spawn(test_bar()).join().unwrap();
        assert_eq!(val, 200);
    }

    #[test]
    fn test_sleep() {
        let _ = Builder::new().nth(1).max_cache(1).build();
        async fn test_sleep(val: i32) -> i32 {
            sleep(core::time::Duration::new(1, 0)).await;
            yield_now().await;
            val + 100
        }
        let val = spawn(test_sleep(100)).join().unwrap();
        assert_eq!(val, 200);
    }

    #[test]
    fn test_ready() {
        let _ = Builder::new().nth(1).max_cache(1).build();
        let val = spawn(async {
            let mut retn: i32 = 0;
            async { 2_i32 }.ready(|result: i32| retn = result).await;
            retn
        })
        .join()
        .unwrap();
        assert_eq!(val, 2);
    }

    #[test]
    fn test_select_any() {
        let _ = Builder::new().nth(1).max_cache(1).build();
        let (r1, r2) = spawn(async {
            let mut r1: i32 = 0;
            let mut r2: i32 = 0;
            let t1 = async { 1_i32 }.ready(|result: i32| r1 = result);
            let t2 = async { 2_i32 }.ready(|result: i32| r2 = result);
            t1.or(t2).await;
            (r1, r2)
        })
        .join()
        .unwrap();
        assert_eq!(r1, 1);
        assert_eq!(r2, 0);
    }

    #[test]
    fn test_select_all() {
        let _ = Builder::new().nth(1).max_cache(1).build();
        let (r1, r2) = spawn(async {
            let mut r1: i32 = 0;
            let mut r2: i32 = 0;
            let t1 = async { 1_i32 }.ready(|result: i32| r1 = result);
            let t2 = async { 2_i32 }.ready(|result: i32| r2 = result);
            t1.and(t2).await;
            (r1, r2)
        })
        .join()
        .unwrap();
        assert_eq!(r1, 1);
        assert_eq!(r2, 2);
    }

    #[test]
    fn test_or_ready() {
        let _ = Builder::new().nth(1).max_cache(1).build();
        let (r1, r2, r3) = spawn(async {
            async fn foo() -> i32 {
                1
            }
            async fn bar() -> &'static str {
                "hello"
            }
            async fn baz() -> i32 {
                2
            }
            let mut foo_retn = 0;
            let mut bar_retn = "";
            let mut baz_retn = 0;
            foo()
                .ready(|val| foo_retn = val)
                .or(bar().ready(|val| bar_retn = val))
                .or(baz().ready(|val| baz_retn = val))
                .await;
            (foo_retn, bar_retn, baz_retn)
        })
        .join()
        .unwrap();
        assert_eq!(r1, 1);
        assert_eq!(r2, "");
        assert_eq!(r3, 0);
    }
    #[test]
    fn test_and_ready() {
        let _ = Builder::new().nth(1).max_cache(1).build();
        let (r1, r2, r3) = spawn(async {
            async fn foo() -> i32 {
                1
            }
            async fn bar() -> &'static str {
                "hello"
            }
            async fn baz() -> i32 {
                2
            }
            let mut foo_retn = 0;
            let mut bar_retn = "";
            let mut baz_retn = 0;
            foo()
                .ready(|val| foo_retn = val)
                .and(bar().ready(|val| bar_retn = val))
                .or(baz().ready(|val| baz_retn = val))
                .await;
            (foo_retn, bar_retn, baz_retn)
        })
        .join()
        .unwrap();
        assert_eq!(r1, 1);
        assert_eq!(r2, "hello");
        assert_eq!(r3, 0);
    }

    #[test]
    fn test_deadline() {
        let _ = Builder::new().nth(1).max_cache(1).build();
        let beg = crate::time::now();
        async fn foo() -> Option<i32> {
            async fn fun() -> i32 {
                sleep(Duration::new(3, 0)).await;
                return 1;
            }
            fun().deadline(Duration::new(1, 0)).await
        }
        let x = spawn(foo()).join().unwrap();
        assert_eq!(None, x);
        let end = crate::time::now();
        let elapse = end - beg;
        assert!(elapse < Duration::new(2, 1000));
    }
}
